Creating the Conversation Target
After you create the initiator, the next step is to create the service program in XCatMgmt
that is activated when messages arrive. This step involves a bit more
work because this program needs to receive messages of at least three
types (Error, EndDialog, and CatalogChangeMessage), create and send acknowledgment messages, and perform local database DML.
The example in Listing 4
contains a stored procedure that receives correlated messages from the
initiator. Note that this procedure is the same (empty) one you
specified in Listing 2.
Listing 4. Using GET CONVERSATION DIALOG, RECEIVE, and SEND ON CONVERSATION in an Service Broker–Activated Stored Procedure
use XCatMgmt GO DROP PROC Publication.CatalogChangeQueueReader GO CREATE PROCEDURE Publication.CatalogChangeQueueReader AS DECLARE @e int, @r int, @MsgTypeName nvarchar(128), @desc varchar(255), @MsgXml xml, @AckXml xml (DOCUMENT dbo.GenericAcknowledgementSchema), @RemoteSSBGuid uniqueidentifier, @ErrNS varchar(150), @EndDlgNS varchar(150), @CatChangeNS varchar(150), @TempXml xml, @NewName varchar(100), @SourceProductId int, @ChangeType int, @ConversationGroupId uniqueidentifier, @DialogHandle uniqueidentifier
SET @ErrNS = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error' SET @EndDlgNS = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog' SET @CatChangeNS = '//samspublishing.com/SS2008/SSB/MessageTypes/ CatalogChangeMessage'
-- Get the SSB guid for the initiator's db SELECT @RemoteSSBGuid = service_broker_guid FROM sys.databases WHERE name = 'AdventureWorks2008';
BEGIN TRAN; WAITFOR( GET CONVERSATION GROUP @ConversationGroupId FROM Publication.CatalogChangeQueue ), TIMEOUT 1000
IF @ConversationGroupId IS NULL BEGIN ROLLBACK TRAN PRINT '(Target) ConversationGroupId not acquired in time.' RETURN END ELSE PRINT '(Target) ConversationGroupId acquired successfully.';
RECEIVE TOP(1) @MsgXml = CAST(message_body as xml), @MsgTypeName = message_type_name, @DialogHandle = conversation_handle FROM Publication.CatalogChangeQueue WHERE conversation_group_id = @ConversationGroupId
SELECT @e = @@ERROR, @r = @@ROWCOUNT IF @r = 0 BEGIN ROLLBACK TRAN RETURN END
IF @e != 0 BEGIN ROLLBACK TRAN PRINT '(Target) Error during receive.' RETURN END ELSE PRINT '(Target) Message received.'
-- if the msg is of type Error, end the conversation, stating the error IF @MsgTypeName = @ErrNS BEGIN SELECT @desc = @MsgXml.value(' declare default element namespace "http://schemas.microsoft.com/SQL/ServiceBroker/Error"; (/Error/Description/text())[1]', 'varchar(255)')
ROLLBACK TRAN PRINT '(Target) Error message received.' END CONVERSATION @DialogHandle WITH ERROR = 808 DESCRIPTION = @desc RETURN END -- if the msg is of type EndDialog, end the conversation without error IF @MsgTypeName = @EndDlgNS BEGIN PRINT '(Target) EndDialog message received.'; END CONVERSATION @DialogHandle RETURN END
-- if the msg is of type CatalogChangeMessage, update appropriately IF @MsgTypeName = @CatChangeNS BEGIN BEGIN TRY -- what kind of change is requested? -- (here we only deal with product name and xml changes) ;WITH XMLNAMESPACES ( DEFAULT 'urn:www-samspublishing-com:examples:ssb:catalogchange' ) SELECT @ChangeType = @MsgXml.value(' (/CatalogChangeMessage/CatalogChange/@ChangeType)[1]', 'int'), @NewName = @MsgXml.value(' (/CatalogChangeMessage/CatalogChange/@Name)[1]', 'varchar(100)'), @SourceProductId = @MsgXml.value(' (/CatalogChangeMessage/CatalogChange/@SourceProductId)[1]', 'int')
IF @ChangeType IS NULL OR @NewName IS NULL OR @SourceProductId IS NULL BEGIN ROLLBACK TRAN PRINT '(Target) An xml-selected value is NULL.' RETURN END
IF @ChangeType = 2 -- "Update" BEGIN UPDATE Publication.Product SET ProductName = @NewName, ProductDetailXml = @MsgXml WHERE @SourceProductId = SourceProductId IF @@ERROR != 0 OR @@ROWCOUNT = 0 BEGIN ROLLBACK TRAN PRINT '(Target) Failure during table update.' RETURN END SET @AckXml = ' <Ack xmlns="urn:www-samspublishing-com:examples:ssb:genericack" ResultCode="1"> <ResultMessage ContentId="' + CAST(@SourceProductId as varchar(10)) + '" MsgType="1">Success!</ResultMessage></Ack>';
SEND ON CONVERSATION @DialogHandle MESSAGE TYPE [//samspublishing.com/SS2008/SSB/MessageTypes/GenericAck] (@AckXml) PRINT '(Target) Message Sent Successfully.' END END TRY BEGIN CATCH ROLLBACK TRAN SELECT @desc = ERROR_MESSAGE() -- INSERT dbo.TargetErrs SELECT @desc -- simple error storage table PRINT '(Target) Caught error:' + @desc END CONVERSATION @DialogHandle WITH ERROR = 808 DESCRIPTION = @desc RETURN END CATCH END COMMIT TRAN
|
One issue you might notice when testing the code in Listing 4 is that the PRINT
statements in the activated procedure do not show up in the SSMS query
window. You need to use SQL Profiler to aid in debugging activated code because it
always runs on background threads. To help you out, a Service Broker
event group is available in SQL Profiler for tracing all the new Service
Broker events’ message transmission, activation, conversation beginning
and ending, and so on. You can use this new event group along with the
T-SQL and stored procedure event groups to trace the code path. Print
statements are included in the code in Listing 49.5 to make debugging easier.
The code in Listing 49.5 introduces three new SQL statements: GET CONVERSATION DIALOG, RECEIVE, and GET CONVERSATION GROUP.
The purpose of GET CONVERSATION DIALOG is to lock the next available conversation group associated with the messages in Publication.CatalogChangeQueue. Conventional use of GET CONVERSATION DIALOG requires that the WAITFOR statement be used to make the initiated program wait a specified number of milliseconds (or infinitely, as specified in TIMEOUT) before continuing on in the program logic. If the conversation group ID has been received
within the specified time, the code successfully locks that
conversation group (that is, locks the specified queue for receiving).
If not, @ConversationGroupId is NULL.
After this call, the program attempts to receive the next message in the queue by using the RECEIVE statement, whose syntax is similar to that of SELECT, except that instead of specifying a table name, you specify a queue name.
Next, the code checks the received message type and takes the appropriate action. If the message type is Error, it ends the dialog, reporting the error. If the message type is EndDialog, it simply ends its side of the dialog. If it is a catalog change message, it updates Publication.Product so that the related row in Publication.ProductCatalog (which associates products with catalogs) now points to the newest data.
Just as with the initiator code, the target code first declares its outgoing messages as a typed XML variable (@AckXml). This helps in making sure that the outgoing message will be received without error.
One issue to be mindful of
is that all this code is executing in the scope of a single transaction.
If any part of the code fails, the ROLLBACK statement rolls back any DML as well as message sends and receives. To test the code, you can execute the following statement:
EXEC Production.ProductModelUpdate 749, 'A Super Product'
Note
You might want to come up with a clever way of populating the body of the responding Production.CatalogChangeAckQueueReader stored procedure to deal with the incoming acknowledgment messages sent by the target.
Prioritizing Services
The next issue to consider
is conversation priority. A safe assumption is that some of your
software applications are more important than others. Likewise, some of
your SSB services are mission-critical (and should be configurable as
such) and others may be only for casual use. You can easily set a
numeric priority for these services so that when system resources are
scarce (during periods of high server and network utilization), messages
for your critical applications are processed first.
SQL Server 2008 offers a
simple syntax for accomplishing this. The following example illustrates
how to set the priority to level 1 for messages transmitted from our SSB
initiator (local) service to our target (remote) service:
CREATE BROKER PRIORITY CatChangeInitToTarget
FOR CONVERSATION
SET
(
CONTRACT_NAME =
[//samspublishing.com/SS2008/SSB/Contracts/BasicCatalogChangeContract],
LOCAL_SERVICE_NAME =
[//samspublishing.com/SS2008/SSB/Services/CatalogChangeInitiatorService],
REMOTE_SERVICE_NAME =
N'//samspublishing.com/SS2008/SSB/Services/CatalogChangeNotifyTarget',
PRIORITY_LEVEL = 1
)
Next, you need to learn how to set up Service Broker messaging applications to run on multiple instances of SQL Server.